Skip to content

gh-47798: Add a subprocess.run_pipeline() API#142080

Draft
gpshead wants to merge 47 commits intopython:mainfrom
gpshead:claude/subprocess-pipe-chaining-01R27VPueru4RfRXYDsV5TmW
Draft

gh-47798: Add a subprocess.run_pipeline() API#142080
gpshead wants to merge 47 commits intopython:mainfrom
gpshead:claude/subprocess-pipe-chaining-01R27VPueru4RfRXYDsV5TmW

Conversation

@gpshead
Copy link
Copy Markdown
Member

@gpshead gpshead commented Nov 29, 2025

This was a feature request from 2008.

Summary

Read the docs in the PR for details, but it basically mirrors the run() API, just with multiple commands:

>>> from subprocess import run_pipeline
>>> run_pipeline(
...     ["gh", "issue", "list", "--state", "open", "--search", "subprocess in:title",
...      "--json", "number", "--limit", "500"],
...     ["jq", "length"],
... )
97
CompletedPipeline(commands=(PipelineCommand(['gh', 'issue', 'list', '--state', 'open', '--search', 'subprocess in:title', '--json', 'number', '--limit', '500']), PipelineCommand(['jq', 'length'])), returncodes=(0, 0))

Why?

What started out as an exercise of "could I guide a modern model in Nov-2025 towards creating something long term public API worthy" (yes, but a lot of guidance and handholding from me because I am picky as this is CPython), wound up with me settling on this as a viable interface.

Deciding if I should pick it up again and proceed meant doing a survey of a large codebase at work. I found run_pipeline() would actually be a benefit if it existed:

  • NN hand-rolled Popen|Popen chains; ~70% have at least one of the three classic bugs (producer rc never checked, parent doesn't close the intermediate pipe end, producer never wait()ed). The correct ones had over a dozen of lines of tricky boilerplate.
  • NN*3 run() calls with shell=True or bash -c that exist only to get |, several bolting on a bash specific set -o pipefail.
  • No in-house pipeline helpers or PyPI shell pipeline package use. People reach for shell=True or hand-roll (the PyPI offerings found all have caveats, see below).

run_pipeline(check=True) makes the correct behavior readily available.

Why not do this as standalone PyPI package?

Good question. Yes actually, it looks easy to turn this into one. Pondering that now...

Design notes

Read the docs built from the PR. Anything decisiony that I don't think makes sense to cover directly in the docs or comments fits here.

CompletedPipeline and PipelineError are siblings of CompletedProcess and CalledProcessError under SubprocessError, not subclasses. Being plural, they don't have the same API shape. A subclass would've been confusing and awkward. A similar reason that TimeoutExpired also inherits from SubprocessError.

Timeouts matches run()'s behavior: SIGKILL each process. My codebase survey found no field evidence pushing toward a SIGPIPE-cascade-then-kill alternative. This is already better than shell=True here -- run("a|b", shell=True, timeout=...) kills only the shell and could orphan a and b, whereas run_pipeline() kills each command directly.

Out of scope "maybe someone could do later"

  • A new_process_group= boolean to place every command in one new process group (so os.killpg() reaches grandchildren too). Not needed for shell=True parity as non-interactive sh -c doesn't do job control either.
  • Per-command stderr=PIPE capture on PipelineCommand (implies a CompletedPipeline.stderrs list, or even a complex non-run-like API for dynamic control of multiple pipes). Rarely needed.
  • An easier to use Popen-level pipeline for streaming consumers (for line in last.stdout:). A few sites in my survey wanted this; run_pipeline() waits for completion. The run* APIs are synchronous by design, not meant for this.
  • An async variant is the single biggest blocker in async-heavy code, but belongs in the relevant async libraries, not in subprocess. Entirely separate.

Alternative ideas considered

I pondered the | pipe operator between objects, but that's unnatural for Popen instances themselves since those start upon creation. You want processes started sequentially with the actual stdout->stdin chain made from the start, so a run-like API makes sense. Neat idea, kinda like pathlib and its use of /. Consider it in the future. Maybe it'd expand to be combined with this PR's PipelineCommand interface? Likely over-engineering.

This lets people avoid using a shell. It does not offer raw Popen flexibility for I/O multiplexing, though you can pass your own file objects for input/output and feed/consume them from threads. Introducing threads to the subprocess module on a platform where there were not any so far (posix) was specifically a non-goal. Same for offering complicated I/O multiplexing in a public API.

PyPI prior art


📚 Documentation preview 📚: https://cpython-previews--142080.org.readthedocs.build/

gpshead and others added 12 commits November 29, 2025 08:04
Add a new run_pipeline() function to the subprocess module that enables
running multiple commands connected via pipes, similar to shell pipelines.

New API:
- run_pipeline(*commands, ...) - Run a pipeline of commands
- PipelineResult - Return type with commands, returncodes, stdout, stderr
- PipelineError - Raised when check=True and any command fails

Features:
- Supports arbitrary number of commands (minimum 2)
- capture_output, input, timeout, and check parameters like run()
- stdin= connects to first process, stdout= connects to last process
- Text mode support via text=True, encoding, errors
- All processes share a single stderr pipe for simplicity
- "pipefail" semantics: check=True fails if any command fails

Unlike run(), this function does not accept universal_newlines.
Use text=True instead.

Example:
    result = subprocess.run_pipeline(
        ['cat', 'file.txt'],
        ['grep', 'pattern'],
        ['wc', '-l'],
        capture_output=True, text=True
    )

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Document the new run_pipeline() function, PipelineResult class, and
PipelineError exception in the subprocess module documentation.

Includes:
- Function signature with stdin, stdout, stderr, capture_output, etc.
- Note about shared stderr pipe and text mode caveat for interleaved
  multi-byte character sequences
- Note that universal_newlines is not supported (use text=True)
- Explanation that stdin connects to first process, stdout to last
- Usage examples showing basic pipelines, multi-command pipelines,
  input handling, and error handling with check=True
- PipelineResult attributes: commands, returncodes, returncode,
  stdout, stderr, and check_returncodes() method
- PipelineError attributes: commands, returncodes, stdout, stderr,
  and failed list

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Add _communicate_streams() helper function that properly multiplexes
read/write operations to prevent pipe buffer deadlocks. The helper
uses selectors on POSIX and threads on Windows, similar to
Popen.communicate().

This fixes potential deadlocks when large amounts of data flow through
the pipeline and significantly improves performance.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Add three tests that verify the multiplexed I/O implementation
properly handles large data volumes that would otherwise cause
pipe buffer deadlocks:

- test_pipeline_large_data_no_deadlock: 256KB through 2-stage pipeline
- test_pipeline_large_data_three_stages: 128KB through 3-stage pipeline
- test_pipeline_large_data_with_stderr: 64KB with concurrent stderr

These tests would timeout or deadlock without proper multiplexing.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Remove support for raw file descriptors in _communicate_streams(),
requiring all streams to be file objects. This simplifies both the
Windows and POSIX implementations by removing isinstance() checks
and fd-wrapping logic.

The run_pipeline() function now wraps the stderr pipe's read end
with os.fdopen() immediately after creation.

This change makes _communicate_streams() more compatible with
Popen.communicate() which already uses file objects, enabling
potential future refactoring to share the multiplexed I/O logic.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Update the test to write 64KB to stderr from each process (128KB total)
instead of just small status messages. This better tests that the
multiplexed I/O handles concurrent large data on both stdout and stderr
without deadlocking.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
The comment suggested rewriting Popen._communicate() to use
non-blocking I/O on file objects now that Python 3's io module
is used instead of C stdio.

This is unnecessary - the current approach using select() to
detect ready fds followed by os.read()/os.write() is correct
and efficient. The selector already solves "when is data ready?"
so non-blocking mode would add complexity with no benefit.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Extract the core selector-based I/O loop into a new _communicate_io_posix()
function that is shared by both _communicate_streams_posix() (used by
run_pipeline) and Popen._communicate() (used by Popen.communicate).

The new function:
- Takes a pre-configured selector and output buffers
- Supports resume via input_offset parameter (for Popen timeout retry)
- Returns (new_offset, completed) instead of raising TimeoutExpired
- Does not close streams (caller decides based on use case)

This reduces code duplication and ensures both APIs use the same
well-tested I/O multiplexing logic.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Move stdin writing to a background thread in _communicate_streams_windows
to avoid blocking indefinitely when writing large input to a pipeline
where the subprocess doesn't consume stdin quickly.

This mirrors the fix made to Popen._communicate() for Windows in
commit 5b1862b (pythongh-87512).

Add test_pipeline_timeout_large_input to verify that TimeoutExpired
is raised promptly when run_pipeline() is called with large input
and a timeout, even when the first process is slow to consume stdin.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Apply the same fixes from Popen._communicate() to _communicate_streams_posix
for run_pipeline():

1. Handle non-byte memoryview input by casting to byte view (pythongh-134453):
   Non-byte memoryviews (e.g., int32 arrays) had incorrect length tracking
   because len() returns element count, not byte count. Now cast to "b"
   view for correct progress tracking.

2. Handle ValueError on stdin.flush() when stdin is closed (pythongh-74389):
   Ignore ValueError from flush() if stdin is already closed, matching
   the BrokenPipeError handling.

Add tests for memoryview input to run_pipeline:
- test_pipeline_memoryview_input: basic byte memoryview
- test_pipeline_memoryview_input_nonbyte: int32 array memoryview

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Extract common stdin preparation logic into shared helper functions
used by both _communicate_streams_posix() and Popen._communicate():

- _flush_stdin(stdin): Flush stdin, ignoring BrokenPipeError and
  ValueError (for closed files)

- _make_input_view(input_data): Convert input data to a byte memoryview,
  handling non-byte memoryviews by casting to "b" view

This ensures consistent behavior and makes the fixes for pythongh-134453
(memoryview) and pythongh-74389 (closed stdin) shared in one place.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
- Factor out _translate_newlines() as a module-level function, have
  Popen's method delegate to it for code sharing
- Remove rejection of universal_newlines kwarg in run_pipeline(), treat
  it the same as text=True (consistent with Popen behavior)
- Use _translate_newlines() for text mode decoding in run_pipeline()
  to properly handle \r\n and \r newline sequences
- Update documentation to remove mention of universal_newlines rejection
- Update test to verify universal_newlines=True works like text=True

Co-authored-by: Claude <noreply@anthropic.com>
@gpshead gpshead added type-feature A feature request or enhancement stdlib Standard Library Python modules in the Lib/ directory topic-subprocess Subprocess issues. labels Nov 29, 2025
@gpshead gpshead self-assigned this Nov 29, 2025
Comment thread Doc/library/subprocess.rst Outdated
@merwok

This comment was marked as off-topic.

gpshead and others added 11 commits April 25, 2026 12:27
…e-chaining-01R27VPueru4RfRXYDsV5TmW

# Conflicts:
#	Lib/subprocess.py
text=/universal_newlines=/encoding=/errors= were forwarded to each per-
command Popen, which wrapped parent-side pipes in TextIOWrapper. The
threaded Windows _communicate_streams_* backend does fh.write(bytes) and
fh.read()->bytes and so failed with TypeError/AttributeError. POSIX uses
fd-level os.read/os.write and silently tolerated the mismatch.

Pop those kwargs in run_pipeline and handle encoding at the pipeline
boundary as already intended. Every parent-side pipe now stays binary,
matching the documented _communicate_streams contract.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…_pipeline

When run_pipeline() captured stderr but not stdout (e.g.
stdout=DEVNULL with stderr=PIPE), a timeout would surface
TimeoutExpired with stderr bytes in the .output field, because
_communicate_streams used read_streams[0] regardless of which
stream it actually was. Pass stdout and stderr explicitly to the
helper and populate both TimeoutExpired.output and .stderr.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
stdin=PIPE without input= leaves a writable pipe owned by the parent
that nobody writes or closes, so the first child blocks reading stdin
forever. There is no useful semantic for this combination in
run_pipeline (callers wanting to feed input use input=, callers wanting
a file/fd pass it directly). Reject it explicitly with ValueError.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
If close_fds=False is forwarded to each Popen, every child inherits
copies of all the other children's pipe ends. Closing a write end in
the parent then no longer signals EOF to the reader because other
children still hold a copy open, leading to deadlocks. Reject
explicit close_fds=False with ValueError; the default close_fds=True
behavior is what works.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- errors=None default matches Popen convention (was 'strict', a
  divergence that bypassed the TextIOWrapper/bytes.decode default).
- PipelineError now calls super().__init__(commands, returncodes)
  so e.args is populated, fixing repr() and pickle.
- Drop the dead `if self.returncodes else None` fallback in
  PipelineResult.returncode; returncodes is always populated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The cleanup `finally` block was killing and waiting on each child in
turn, so a single hung wait() would leave later children un-killed.
Match the kill-all-then-wait-all pattern already used by the timeout
cleanup paths.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Rename _remaining_time_helper to _deadline_remaining.
- Note that Popen._translate_newlines remains a method for subclass
  back-compat (logic moved to a module-level function).
- Cap PipelineError.__str__ at three failures with "and N more" so a
  long failed pipeline doesn't produce an unwieldy message.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
stderr=STDOUT redirects each child's stderr to its own stdout fd, so
non-final processes route stderr into the next process's stdin -
surprising for callers expecting shell-like 2>&1 to the pipeline's
final stdout.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add tests for the check=True happy path, stderr=STDOUT routing of
the final process's stderr, the intermediate-stdout-closed-in-parent
contract, and pickle/repr round-tripping of PipelineError.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
For consistency with CompletedProcess, returned by subprocess.run().
The error type PipelineError stays, paralleling CalledProcessError /
CompletedProcess in the existing module.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
gpshead and others added 3 commits April 25, 2026 19:03
…timeout

Strengthen test_pipeline_timeout to assert that TimeoutExpired.output
and TimeoutExpired.stderr are either None or bytes when a pipeline
times out mid-flight. Both backends (POSIX selector and Windows
threaded) populate these attributes from any partial reads, so the
assertion is meaningful on every platform CI runs on.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drop section-label and restate-the-code comments added with run_pipeline
and its helpers, and reframe the remaining ones around the invariant
they document (pipe-EOF on parent close, drain-writer-before-readers,
multiplexing prevents buffer-fill deadlocks, _input_offset persists for
resume) so future readers get the why, not a narration of the code.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ion point

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@gpshead gpshead force-pushed the claude/subprocess-pipe-chaining-01R27VPueru4RfRXYDsV5TmW branch from f31429f to 86a1b19 Compare April 26, 2026 02:03
gpshead added 11 commits April 26, 2026 04:12
…ams on Windows

_communicate_io_posix referenced the POSIX-only _PIPE_BUF from
unconditional module scope; move it into the non-Windows branch
alongside _communicate_streams_posix.

_communicate_streams_windows now closes each read stream once its
reader thread has joined, matching the POSIX implementation and the
helper's documented contract.
…ntry cap

Negative returncodes on POSIX mean the child was killed by a signal;
report them as "died with <Signals.SIGFOO>" rather than a bare negative
integer, matching CalledProcessError.

Also drop the 3-entry truncation: real pipelines are short enough that
more than three failing stages is rare, and when it happens the
traceback is exactly where a user wants to see every failure rather
than "and N more".  The full list remains on .failed regardless.
…skip timeout

Hoist array and pickle to module-level imports rather than importing
inside test methods.  Correct the intermediate-stdout-close docstring
(the producer hits a broken pipe, not EOF).  Drop the Windows skip on
test_pipeline_timeout: the body is platform-neutral and exercises the
threaded backend's TimeoutExpired path too.
…awn failure

test_pipeline_error_str_signal covers the negative-returncode rendering
in PipelineError.__str__.

test_pipeline_spawn_failure_cleans_up exercises the run_pipeline
finally-block cleanup when a later stage fails to exec: stage 0 is
already running and sleeping, stage 1's executable does not exist, and
the call must return promptly with the OSError rather than hang on
stage 0.
The pipeline replaces the shell; per-stage shell would re-introduce the
quoting and injection surface this API exists to avoid.  A future
Stage() wrapper is the place for the rare stage that needs it.
…w entry

The "Replacing shell pipeline" recipe now recommends run_pipeline()
first and demotes the manual Popen chain to the streaming case.  Note
that PipelineError is a sibling of CalledProcessError, not a subclass.
…peline overrides

PipelineCommand(args, /, *, stderr=, env=, cwd=, shell=) wraps one
command with overrides; run_pipeline() normalizes every positional to
this type, so .commands is always uniform and a bare-str positional is
rejected before any process spawns.

CompletedPipeline and PipelineError hold .commands and .returncodes as
tuples.  Drop the singular CompletedPipeline.returncode property: it
is the non-pipefail last-command code that the API exists to steer
callers away from; returncodes[-1] is the explicit spelling.

Consolidate the run_pipeline stderr docs into one rubric and align
prose on "command".
…e-level stderr=STDOUT

start_new_session and process_group: each command is spawned as a
sibling child of the calling process, so applying these per command
yields N separate sessions/groups rather than a single group spanning
the pipeline.

stderr=STDOUT at the pipeline level: would merge each non-final
command's stderr into the next command's stdin.  Per-command
PipelineCommand(stderr=STDOUT) covers the legitimate use.

Leave a note at Popen.__init__ reminding future kwarg additions to
consider run_pipeline forwarding.
…utput

Annotate spawn-time OSError with an exception note naming the
PipelineCommand and its index, so a FileNotFoundError mid-pipeline
tells the caller which command failed; the exception type is unchanged.

Reformat PipelineError.__str__ as "argv (commands[i]) detail" so the
argv leads and the index is unambiguously a Python list subscript.
… doc examples

Style-only pass over the pipeline-related code, tests, and doc
examples.  Single quotes kept where the literal contains a " (to avoid
escaping) and in repr-output examples (Python's repr uses ').
@gpshead gpshead added the 🔨 test-with-buildbots Test PR w/ buildbots; report in status section label Apr 26, 2026
@bedevere-bot

This comment was marked as outdated.

@bedevere-bot bedevere-bot removed the 🔨 test-with-buildbots Test PR w/ buildbots; report in status section label Apr 26, 2026
Comment thread Doc/library/subprocess.rst Outdated
gpshead added 3 commits April 27, 2026 00:59
…ipe-chaining-01R27VPueru4RfRXYDsV5TmW

# Conflicts:
#	Lib/subprocess.py
…e subsections

Give run() and run_pipeline() their own headings so they appear in
the page sidebar, move the run_pipeline block after the Popen
constructor section, mention run_pipeline in the intro paragraph and
Exceptions section, and split DEVNULL/PIPE/STDOUT and the base
exception classes into a "Constants and base exceptions" subsection
so they are not nested under the run heading in the TOC.
…ut reading

In tests where the second command exits immediately (sys.exit(N))
without reading stdin, the first command's stdout flush during
interpreter shutdown can hit a readerless pipe and yield exit code
120, breaking assertions on returncodes[0] == 0.  Seen on Windows
free-threading and FreeBSD refleak buildbots.  The first command's
output is unused in these tests; switch it to "pass".
@gpshead gpshead added the 🔨 test-with-buildbots Test PR w/ buildbots; report in status section label Apr 27, 2026
@bedevere-bot
Copy link
Copy Markdown

🤖 New build scheduled with the buildbot fleet by @gpshead for commit 03c2da4 🤖

Results will be shown at:

https://buildbot.python.org/all/#/grid?branch=refs%2Fpull%2F142080%2Fmerge

If you want to schedule another build, you need to add the 🔨 test-with-buildbots label again.

@bedevere-bot bedevere-bot removed the 🔨 test-with-buildbots Test PR w/ buildbots; report in status section label Apr 27, 2026
@gpshead gpshead marked this pull request as ready for review April 27, 2026 01:52
@gpshead gpshead requested a review from AA-Turner as a code owner April 27, 2026 01:52
@gpshead gpshead marked this pull request as draft April 27, 2026 04:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

stdlib Standard Library Python modules in the Lib/ directory topic-subprocess Subprocess issues. type-feature A feature request or enhancement

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants